/*
* This file is part of Project SkyFire https://www.projectskyfire.org.
* See LICENSE.md file for Copyright information
*/

#ifndef _DATABASEWORKERPOOL_H
#define _DATABASEWORKERPOOL_H

#include <ace/Thread_Mutex.h>

#include "AdhocStatement.h"
#include "Callback.h"
#include "Common.h"
#include "DatabaseWorker.h"
#include "Log.h"
#include "MySQLConnection.h"
#include "PreparedStatement.h"
#include "QueryHolder.h"
#include "QueryResult.h"
#include "Transaction.h"

#define MIN_MYSQL_SERVER_VERSION 50100u
#define MIN_MYSQL_CLIENT_VERSION 50100u

class PingOperation : public SQLOperation
{
    //! Operation for idle delaythreads
    bool Execute()
    {
        m_conn->Ping();
        return true;
    }
};

template <class T>
class DatabaseWorkerPool
{
public:
    /* Activity state */
    DatabaseWorkerPool() : _queue(new ACE_Activation_Queue()), _connectionInfo(NULL)
    {
        memset(_connectionCount, 0, sizeof(_connectionCount));
        _connections.resize(IDX_SIZE);

        WPFatal(mysql_thread_safe(), "Used MySQL library isn't thread-safe.");
        WPFatal(mysql_get_client_version() >= MIN_MYSQL_CLIENT_VERSION, "SkyFire does not support MySQL versions below 5.1");
    }

    ~DatabaseWorkerPool()
    {
    }

    bool Open(const char* host, const char* port, const char* user, const char* password, const char* database, uint8 async_threads, uint8 synch_threads)
    {
        bool res = true;
        _connectionInfo = new MySQLConnectionInfo(host, port, user, password, database);

        SF_LOG_INFO("sql.driver", "Opening DatabasePool '%s'. Asynchronous connections: %u, synchronous connections: %u.",
            GetDatabaseName(), async_threads, synch_threads);

        //! Open asynchronous connections (delayed operations)
        _connections[IDX_ASYNC].resize(async_threads);
        for (uint8 i = 0; i < async_threads; ++i)
        {
            T* t = new T(_queue, *_connectionInfo);
            res &= t->Open();
            if (res) // only check mysql version if connection is valid
                WPFatal(mysql_get_server_version(t->GetHandle()) >= MIN_MYSQL_SERVER_VERSION, "SkyFire does not support MySQL versions below 5.1");
            _connections[IDX_ASYNC][i] = t;
            ++_connectionCount[IDX_ASYNC];
        }

        //! Open synchronous connections (direct, blocking operations)
        _connections[IDX_SYNCH].resize(synch_threads);
        for (uint8 i = 0; i < synch_threads; ++i)
        {
            T* t = new T(*_connectionInfo);
            res &= t->Open();
            _connections[IDX_SYNCH][i] = t;
            ++_connectionCount[IDX_SYNCH];
        }

        if (res)
            SF_LOG_INFO("sql.driver", "DatabasePool '%s' opened successfully. %u total connections running.", GetDatabaseName(),
                (_connectionCount[IDX_SYNCH] + _connectionCount[IDX_ASYNC]));
        else
            SF_LOG_ERROR("sql.driver", "DatabasePool %s NOT opened. There were errors opening the MySQL connections. Check your SQLDriverLogFile "
                "for specific errors.", GetDatabaseName());
        return res;
    }

    bool Open(const std::string& infoString, uint8 async_threads, uint8 synch_threads)
    {
        bool res = true;
        _connectionInfo = new MySQLConnectionInfo(infoString);

        SF_LOG_INFO("sql.driver", "Opening DatabasePool '%s'. Asynchronous connections: %u, synchronous connections: %u.",
            GetDatabaseName(), async_threads, synch_threads);

        //! Open asynchronous connections (delayed operations)
        _connections[IDX_ASYNC].resize(async_threads);
        for (uint8 i = 0; i < async_threads; ++i)
        {
            T* t = new T(_queue, *_connectionInfo);
            res &= t->Open();
            if (res) // only check mysql version if connection is valid
                WPFatal(mysql_get_server_version(t->GetHandle()) >= MIN_MYSQL_SERVER_VERSION, "Skyfire does not support MySQL versions below 5.1");
            _connections[IDX_ASYNC][i] = t;
            ++_connectionCount[IDX_ASYNC];
        }

        //! Open synchronous connections (direct, blocking operations)
        _connections[IDX_SYNCH].resize(synch_threads);
        for (uint8 i = 0; i < synch_threads; ++i)
        {
            T* t = new T(*_connectionInfo);
            res &= t->Open();
            _connections[IDX_SYNCH][i] = t;
            ++_connectionCount[IDX_SYNCH];
        }

        if (res)
            SF_LOG_INFO("sql.driver", "DatabasePool '%s' opened successfully. %u total connections running.", GetDatabaseName(),
                (_connectionCount[IDX_SYNCH] + _connectionCount[IDX_ASYNC]));
        else
            SF_LOG_ERROR("sql.driver", "DatabasePool %s NOT opened. There were errors opening the MySQL connections. Check your SQLDriverLogFile "
                "for specific errors.", GetDatabaseName());
        return res;
    }

    void Close()
    {
        SF_LOG_INFO("sql.driver", "Closing down DatabasePool '%s'.", GetDatabaseName());

        //! Shuts down delaythreads for this connection pool by underlying deactivate().
        //! The next dequeue attempt in the worker thread tasks will result in an error,
        //! ultimately ending the worker thread task.
        _queue->queue()->close();

        for (uint8 i = 0; i < _connectionCount[IDX_ASYNC]; ++i)
        {
            T* t = _connections[IDX_ASYNC][i];
            DatabaseWorker* worker = t->m_worker;
            worker->wait();     //! Block until no more threads are running this task.
            delete worker;
            t->Close();         //! Closes the actualy MySQL connection.
        }

        SF_LOG_INFO("sql.driver", "Asynchronous connections on DatabasePool '%s' terminated. Proceeding with synchronous connections.",
            GetDatabaseName());

        //! Shut down the synchronous connections
        //! There's no need for locking the connection, because DatabaseWorkerPool<>::Close
        //! should only be called after any other thread tasks in the core have exited,
        //! meaning there can be no concurrent access at this point.
        for (uint8 i = 0; i < _connectionCount[IDX_SYNCH]; ++i)
            _connections[IDX_SYNCH][i]->Close();

        //! Deletes the ACE_Activation_Queue object and its underlying ACE_Message_Queue
        delete _queue;

        SF_LOG_INFO("sql.driver", "All connections on DatabasePool '%s' closed.", GetDatabaseName());
        delete _connectionInfo;
        _connectionInfo = NULL;
    }

    /**
        Delayed one-way statement methods.
    */

    //! Enqueues a one-way SQL operation in string format that will be executed asynchronously.
    //! This method should only be used for queries that are only executed once, e.g during startup.
    void Execute(const char* sql)
    {
        if (!sql)
            return;

        BasicStatementTask* task = new BasicStatementTask(sql);
        Enqueue(task);
    }

    //! Enqueues a one-way SQL operation in string format -with variable args- that will be executed asynchronously.
    //! This method should only be used for queries that are only executed once, e.g during startup.
    void PExecute(const char* sql, ...)
    {
        if (!sql)
            return;

        va_list ap;
        char szQuery[MAX_QUERY_LEN];
        va_start(ap, sql);
        vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap);
        va_end(ap);

        Execute(szQuery);
    }

    //! Enqueues a one-way SQL operation in prepared statement format that will be executed asynchronously.
    //! Statement must be prepared with CONNECTION_ASYNC flag.
    void Execute(PreparedStatement* stmt)
    {
        PreparedStatementTask* task = new PreparedStatementTask(stmt);
        Enqueue(task);
    }

    /**
        Direct synchronous one-way statement methods.
    */

    //! Directly executes a one-way SQL operation in string format, that will block the calling thread until finished.
    //! This method should only be used for queries that are only executed once, e.g during startup.
    void DirectExecute(const char* sql)
    {
        if (!sql)
            return;

        T* t = GetFreeConnection();
        t->Execute(sql);
        t->Unlock();
    }

    //! Directly executes a one-way SQL operation in string format -with variable args-, that will block the calling thread until finished.
    //! This method should only be used for queries that are only executed once, e.g during startup.
    void DirectPExecute(const char* sql, ...)
    {
        if (!sql)
            return;

        va_list ap;
        char szQuery[MAX_QUERY_LEN];
        va_start(ap, sql);
        vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap);
        va_end(ap);

        return DirectExecute(szQuery);
    }

    //! Directly executes a one-way SQL operation in prepared statement format, that will block the calling thread until finished.
    //! Statement must be prepared with the CONNECTION_SYNCH flag.
    void DirectExecute(PreparedStatement* stmt)
    {
        T* t = GetFreeConnection();
        t->Execute(stmt);
        t->Unlock();

        //! Delete proxy-class. Not needed anymore
        delete stmt;
    }

    /**
        Synchronous query (with resultset) methods.
    */

    //! Directly executes an SQL query in string format that will block the calling thread until finished.
    //! Returns reference counted auto pointer, no need for manual memory management in upper level code.
    QueryResult Query(const char* sql, T* conn = NULL)
    {
        if (!conn)
            conn = GetFreeConnection();

        ResultSet* result = conn->Query(sql);
        conn->Unlock();
        if (!result || !result->GetRowCount())
        {
            delete result;
            return QueryResult(NULL);
        }

        result->NextRow();
        return QueryResult(result);
    }

    //! Directly executes an SQL query in string format -with variable args- that will block the calling thread until finished.
    //! Returns reference counted auto pointer, no need for manual memory management in upper level code.
    QueryResult PQuery(const char* sql, T* conn, ...)
    {
        if (!sql)
            return QueryResult(NULL);

        va_list ap;
        char szQuery[MAX_QUERY_LEN];
        va_start(ap, conn);
        vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap);
        va_end(ap);

        return Query(szQuery, conn);
    }

    //! Directly executes an SQL query in string format -with variable args- that will block the calling thread until finished.
    //! Returns reference counted auto pointer, no need for manual memory management in upper level code.
    QueryResult PQuery(const char* sql, ...)
    {
        if (!sql)
            return QueryResult(NULL);

        va_list ap;
        char szQuery[MAX_QUERY_LEN];
        va_start(ap, sql);
        vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap);
        va_end(ap);

        return Query(szQuery);
    }

    //! Directly executes an SQL query in prepared format that will block the calling thread until finished.
    //! Returns reference counted auto pointer, no need for manual memory management in upper level code.
    //! Statement must be prepared with CONNECTION_SYNCH flag.
    PreparedQueryResult Query(PreparedStatement* stmt)
    {
        T* t = GetFreeConnection();
        PreparedResultSet* ret = t->Query(stmt);
        t->Unlock();

        //! Delete proxy-class. Not needed anymore
        delete stmt;

        if (!ret || !ret->GetRowCount())
        {
            delete ret;
            return PreparedQueryResult(NULL);
        }

        return PreparedQueryResult(ret);
    }

    /**
        Asynchronous query (with resultset) methods.
    */

    //! Enqueues a query in string format that will set the value of the QueryResultFuture return object as soon as the query is executed.
    //! The return value is then processed in ProcessQueryCallback methods.
    QueryResultFuture AsyncQuery(const char* sql)
    {
        QueryResultFuture res;
        BasicStatementTask* task = new BasicStatementTask(sql, res);
        Enqueue(task);
        return res;         //! Actual return value has no use yet
    }

    //! Enqueues a query in string format -with variable args- that will set the value of the QueryResultFuture return object as soon as the query is executed.
    //! The return value is then processed in ProcessQueryCallback methods.
    QueryResultFuture AsyncPQuery(const char* sql, ...)
    {
        va_list ap;
        char szQuery[MAX_QUERY_LEN];
        va_start(ap, sql);
        vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap);
        va_end(ap);

        return AsyncQuery(szQuery);
    }

    //! Enqueues a query in prepared format that will set the value of the PreparedQueryResultFuture return object as soon as the query is executed.
    //! The return value is then processed in ProcessQueryCallback methods.
    //! Statement must be prepared with CONNECTION_ASYNC flag.
    PreparedQueryResultFuture AsyncQuery(PreparedStatement* stmt)
    {
        PreparedQueryResultFuture res;
        PreparedStatementTask* task = new PreparedStatementTask(stmt, res);
        Enqueue(task);
        return res;
    }

    //! Enqueues a vector of SQL operations (can be both adhoc and prepared) that will set the value of the QueryResultHolderFuture
    //! return object as soon as the query is executed.
    //! The return value is then processed in ProcessQueryCallback methods.
    //! Any prepared statements added to this holder need to be prepared with the CONNECTION_ASYNC flag.
    QueryResultHolderFuture DelayQueryHolder(SQLQueryHolder* holder)
    {
        QueryResultHolderFuture res;
        SQLQueryHolderTask* task = new SQLQueryHolderTask(holder, res);
        Enqueue(task);
        return res;     //! Fool compiler, has no use yet
    }

    /**
        Transaction context methods.
    */

    //! Begins an automanaged transaction pointer that will automatically rollback if not commited. (Autocommit=0)
    SQLTransaction BeginTransaction()
    {
        return SQLTransaction(new Transaction);
    }

    //! Enqueues a collection of one-way SQL operations (can be both adhoc and prepared). The order in which these operations
    //! were appended to the transaction will be respected during execution.
    void CommitTransaction(SQLTransaction transaction)
    {
#ifdef SKYFIRE_DEBUG
        //! Only analyze transaction weaknesses in Debug mode.
        //! Ideally we catch the faults in Debug mode and then correct them,
        //! so there's no need to waste these CPU cycles in Release mode.
        switch (transaction->GetSize())
        {
            case 0:
                SF_LOG_DEBUG("sql.driver", "Transaction contains 0 queries. Not executing.");
                return;
            case 1:
                SF_LOG_DEBUG("sql.driver", "Warning: Transaction only holds 1 query, consider removing Transaction context in code.");
                break;
            default:
                break;
        }
#endif // SKYFIRE_DEBUG

        Enqueue(new TransactionTask(transaction));
    }

    //! Directly executes a collection of one-way SQL operations (can be both adhoc and prepared). The order in which these operations
    //! were appended to the transaction will be respected during execution.
    void DirectCommitTransaction(SQLTransaction& transaction)
    {
        T* con = GetFreeConnection();
        if (con->ExecuteTransaction(transaction))
        {
            con->Unlock();      // OK, operation succesful
            return;
        }

        //! Handle MySQL Errno 1213 without extending deadlock to the core itself
        /// @todo More elegant way
        if (con->GetLastError() == 1213)
        {
            uint8 loopBreaker = 5;
            for (uint8 i = 0; i < loopBreaker; ++i)
            {
                if (con->ExecuteTransaction(transaction))
                    break;
            }
        }

        //! Clean up now.
        transaction->Cleanup();

        con->Unlock();
    }

    //! Method used to execute prepared statements in a diverse context.
    //! Will be wrapped in a transaction if valid object is present, otherwise executed standalone.
    void ExecuteOrAppend(SQLTransaction& trans, PreparedStatement* stmt)
    {
        if (trans.null())
            Execute(stmt);
        else
            trans->Append(stmt);
    }

    //! Method used to execute ad-hoc statements in a diverse context.
    //! Will be wrapped in a transaction if valid object is present, otherwise executed standalone.
    void ExecuteOrAppend(SQLTransaction& trans, const char* sql)
    {
        if (trans.null())
            Execute(sql);
        else
            trans->Append(sql);
    }

    /**
        Other
    */

    //! Automanaged (internally) pointer to a prepared statement object for usage in upper level code.
    //! Pointer is deleted in this->DirectExecute(PreparedStatement*), this->Query(PreparedStatement*) or PreparedStatementTask::~PreparedStatementTask.
    //! This object is not tied to the prepared statement on the MySQL context yet until execution.
    PreparedStatement* GetPreparedStatement(uint32 index)
    {
        return new PreparedStatement(index);
    }

    //! Apply escape string'ing for current collation. (utf8)
    void EscapeString(std::string& str)
    {
        if (str.empty())
            return;

        char* buf = new char[str.size() * 2 + 1];
        EscapeString(buf, str.c_str(), str.size());
        str = buf;
        delete[] buf;
    }

    //! Keeps all our MySQL connections alive, prevent the server from disconnecting us.
    void KeepAlive()
    {
        //! Ping synchronous connections
        for (uint8 i = 0; i < _connectionCount[IDX_SYNCH]; ++i)
        {
            T* t = _connections[IDX_SYNCH][i];
            if (t->LockIfReady())
            {
                t->Ping();
                t->Unlock();
            }
        }

        //! Assuming all worker threads are free, every worker thread will receive 1 ping operation request
        //! If one or more worker threads are busy, the ping operations will not be split evenly, but this doesn't matter
        //! as the sole purpose is to prevent connections from idling.
        for (size_t i = 0; i < _connections[IDX_ASYNC].size(); ++i)
            Enqueue(new PingOperation);
    }

private:
    unsigned long EscapeString(char* to, const char* from, unsigned long length)
    {
        if (!to || !from || !length)
            return 0;

        return mysql_real_escape_string(_connections[IDX_SYNCH][0]->GetHandle(), to, from, length);
    }

    void Enqueue(SQLOperation* op)
    {
        _queue->enqueue(op);
    }

    //! Gets a free connection in the synchronous connection pool.
    //! Caller MUST call t->Unlock() after touching the MySQL context to prevent deadlocks.
    T* GetFreeConnection()
    {
        uint8 i = 0;
        size_t num_cons = _connectionCount[IDX_SYNCH];
        T* t = NULL;
        //! Block forever until a connection is free
        for (;;)
        {
            t = _connections[IDX_SYNCH][++i % num_cons];
            //! Must be matched with t->Unlock() or you will get deadlocks
            if (t->LockIfReady())
                break;
        }

        return t;
    }

    char const* GetDatabaseName() const
    {
        return _connectionInfo->_database.c_str();
    }

private:
    enum _internalIndex
    {
        IDX_ASYNC,
        IDX_SYNCH,
        IDX_SIZE
    };

    ACE_Activation_Queue* _queue;             //! Queue shared by async worker threads.
    std::vector< std::vector<T*> >  _connections;
    uint32                          _connectionCount[2];       //! Counter of MySQL connections;
    MySQLConnectionInfo* _connectionInfo;
};

#endif
